package com.xiaofan.ct.common.bean;

import com.xiaofan.ct.common.api.Column;
import com.xiaofan.ct.common.api.RowKey;
import com.xiaofan.ct.common.api.TableRef;
import com.xiaofan.ct.common.constant.Names;
import com.xiaofan.ct.common.constant.ValueConstant;
import com.xiaofan.ct.common.util.DateUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;

/**
 * 基础数据访问对象
 */
public abstract class BaseDao {

    private ThreadLocal<Connection> connHolder = new ThreadLocal<>();
    private ThreadLocal<Admin> adminHolder = new ThreadLocal<>();


    protected void start() throws IOException {
        getConnection();
        getAdmin();
    }

    protected void end() throws IOException {
        Admin admin = adminHolder.get();
        if (admin != null) {
            admin.close();
            adminHolder.remove();
        }

        Connection conn = connHolder.get();
        if (conn != null) {
            conn.close();
            connHolder.remove();
        }

    }

    /**
     * 创建表，如果存在，则删除后重新创建
     *
     * @param name
     * @param families
     */
    protected void createTableXX(String name, String... families) throws IOException {
        createTableXX(name, null, null, families);
    }

    protected void createTableXX(String name, String coProcessClass, Integer regionCount, String... families) throws IOException {
        Admin admin = getAdmin();

        TableName tableName = TableName.valueOf(name);

        if (admin.tableExists(tableName)) {
            deleteTable(name);
        }

        // 创建表
        createTable(name, coProcessClass,regionCount, families);
    }


    private void createTable(String name, String coProcessClass, Integer regionCount, String... families) throws IOException {
        Admin admin = getAdmin();

        TableName tableName = TableName.valueOf(name);

        HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);

        if (families == null || families.length == 0) {
            families = new String[1];
            families[0] = Names.CF_INFO.getValue();
        }
        for (String family : families) {
            HColumnDescriptor columnDescriptor = new HColumnDescriptor(family);
            tableDescriptor.addFamily(columnDescriptor);
        }

        if (coProcessClass != null && !"".equals(coProcessClass)) {
            tableDescriptor.addCoprocessor(coProcessClass);
        }

        // 增加预分区
        if (regionCount == null || regionCount <= 1) {
            admin.createTable(tableDescriptor);
        } else {
            // 分区键
            byte[][] splitKeys = genSplitKey(regionCount);

            admin.createTable(tableDescriptor, splitKeys);
        }

    }


    /**
     * 获取查询时startRow、stopRow集合
     */
    protected List<String[]> getStartStopRowKeys(String tel, String start, String end) {
        ArrayList<String[]> rowKeys = new ArrayList<>();

        String startTime = start.substring(0, 6);
        String endTime = end.substring(0, 6);

        Calendar startCal = Calendar.getInstance();
        startCal.setTime(DateUtil.parse(startTime, "yyyyMM"));

        Calendar endCal = Calendar.getInstance();
        endCal.setTime(DateUtil.parse(endTime, "yyyyMM"));

        while (startCal.getTimeInMillis() <= endCal.getTimeInMillis()) {

            String nowTime = DateUtil.format(startCal.getTime(), "yyyyMM");

            int regionNum = genRegionNum(tel, nowTime);

            // 1_183_202101 ~ 1_183_202101|
            String startRow = regionNum + "_" + tel + "_" + nowTime;
            String stopRow = startRow + "|";

            String[] rkArr = {startRow, stopRow};
            rowKeys.add(rkArr);

            startCal.add(Calendar.MONTH, 1);
        }

        return rowKeys;
    }


    /**
     * 计算分区号(0, 1, 2)
     *
     * @param tel
     * @param date
     * @return
     */
    protected int genRegionNum(String tel, String date) {
        // 18310629526
        String userCode = tel.substring(tel.length() - 4);
        // 20210513
        String yearMonth = date.substring(0, 6);

        int userCodeHash = userCode.hashCode();
        int yearMonthHash = yearMonth.hashCode();

        // crc 校验采用异或算法 hash
        int crc = Math.abs(userCodeHash ^ yearMonthHash);

        int regionNum = crc % ValueConstant.REGION_COUNT;

        return regionNum;
    }

    /**
     * 生成分区键
     */
    private byte[][] genSplitKey(Integer regionCount) {
        int splitKeyCount = regionCount - 1;
        byte[][] bs = new byte[splitKeyCount][];
        // 0|, 1|, 2|, 3|, 4|
        // (-∞, 0|),(0|, 1|), (1|, +∞)      字符串的比较
        List<byte[]> bsList = new ArrayList<>();

        for (int i = 0; i < splitKeyCount; i++) {
            String splitKey = i + "|";
            System.out.println(splitKey);
            bsList.add(Bytes.toBytes(splitKey));
        }

        // 排序, 这里本身就是有序的了，无需在排序
        // Collections.sort(bsList, new Bytes.ByteArrayComparator());
        bsList.toArray(bs);

        return bs;
    }

    /**
     * 增加对象，自动封装数据，将对象数据直接保存到hbase中
     *
     * @param obj
     */
    protected void putData(Object obj) throws IOException, IllegalAccessException {

        Class<?> clazz = obj.getClass();
        TableRef tableRef = clazz.getAnnotation(TableRef.class);
        String tableName = tableRef.value();

        Field[] fields = clazz.getDeclaredFields();
        String rowKey = "";
        for (Field field : fields) {
            RowKey rk = field.getAnnotation(RowKey.class);
            if (rk != null) {
                field.setAccessible(true);
                rowKey = (String) field.get(obj);
                break;
            }
        }
        // 获取表对象
        Connection conn = getConnection();
        Table table = conn.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));

        for (Field field : fields) {
            Column column = field.getAnnotation(Column.class);
            if (column != null) {
                String family = column.family();
                String columnName = column.column();
                if (columnName == null || "".equals(columnName)) {
                    columnName = field.getName();
                }
                field.setAccessible(true);
                String value = (String) field.get(obj);

                put.addColumn(Bytes.toBytes(family), Bytes.toBytes(columnName), Bytes.toBytes(value));
            }
        }

        // 增加数据
        table.put(put);
        // 关闭表
        table.close();
    }


    /**
     * 增加多条数据
     * @param name
     * @param puts
     * @throws IOException
     */
    protected void putData(String name, List<Put> puts) throws IOException {
        // 获取表对象
        Connection conn = getConnection();
        Table table = conn.getTable(TableName.valueOf(name));
        // 增加数据
        table.put(puts);
        // 关闭表
        table.close();
    }

    /**
     * 增加单条数据
     *
     * @param name
     * @param put
     */
    protected void putData(String name, Put put) throws IOException {
        List<Put> puts = Arrays.asList(put);
        putData(name, puts);
    }


    /**
     * 删除表
     *
     * @param name
     * @throws IOException
     */
    protected void deleteTable(String name) throws IOException {
        TableName tableName = TableName.valueOf(name);
        Admin admin = getAdmin();
        admin.disableTable(tableName);
        admin.deleteTable(tableName);
    }


    /**
     * 创建命名空间，如果已经存在，不需要创建，否则创建新的
     */
    protected void createNamespaceNX(String namespace) throws IOException {
        Admin admin = getAdmin();

        try {
            admin.getNamespaceDescriptor(namespace);
        } catch (NamespaceNotFoundException e) {
            NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
            admin.createNamespace(namespaceDescriptor);
        }
    }

    /**
     * 获取链接对象
     */
    protected synchronized Connection getConnection() throws IOException {
        Connection conn = connHolder.get();
        if (conn == null) {
            Configuration conf = HBaseConfiguration.create();
            conn = ConnectionFactory.createConnection(conf);
            connHolder.set(conn);
        }
        return conn;
    }

    /**
     * 获取Admin
     */
    protected synchronized Admin getAdmin() throws IOException {
        Admin admin = adminHolder.get();
        if (admin == null) {
            admin = getConnection().getAdmin();
            adminHolder.set(admin);
        }
        return admin;
    }

}
